pub mod api;
pub mod worker_util;
use actix_web::rt;
use actix_web::{dev::PeerAddr, error, web, Error, HttpRequest, HttpResponse};
use awc::Client;
use qstring::QString;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use url::Url;
use worker_util::{ScriptWorkerId, WorkerPort, PORT_TABLE};
///websocket转发
/// 此函数是用于代理 WebSocket 请求的异步处理程序。 它接受 HTTP 请求和有效负载流作为输入。
//   该函数首先从请求 URI 中提取查询参数并检索“product_code”参数。 如果参数为空，则返回“未找到产品代码”响应。
//   接下来，它在全局端口表中查找给定产品代码的相应端口。 如果未找到端口，则返回“未找到服务”响应。
//   然后，它通过将主机替换为“127.0.0.1”并根据客户端请求设置路径和查询参数来构造目标 URL。
//   之后，它使用 reqwest 库创建一个新的 HTTP GET 请求，并设置客户端请求的标头。
//   将目标请求发送到目标 URL，并检查响应状态。 如果状态不是 101（切换协议），则会返回错误，指示目标未回复预期的升级响应。
//   如果目标响应有效，该函数将创建状态为 101 的客户端响应并将其升级为 WebSocket 响应。 它将标头从目标响应复制到客户端响应。
//   然后，它将目标升级分成单独的读写两半。
//   在一个单独的任务中，它设置一个管道将客户端的流代理到目标的写入部分。 记录代理过程中遇到的任何错误。
//   最后，它从读取的一半创建一个目标流，并返回以目标流作为正文的客户端响应。
pub async fn proxy_ws_request(client_req: HttpRequest, client_stream: web::Payload) -> Result<HttpResponse, Box<dyn std::error::Error>> {
  let query = client_req.uri().query();
  let qs = QString::from(query.unwrap());
  let product_code: &str = qs.get("product_code").unwrap_or("");
  if product_code.is_empty() {
    return Ok(HttpResponse::NotFound().body("product_code not found"));
  }
  let id = ScriptWorkerId(product_code.to_string());
  let hand_port = PORT_TABLE.read().unwrap();
  let WorkerPort(port) = match hand_port.get(&id) {
    Some(p) => p,
    None => {
      return Ok(HttpResponse::NotFound().body(format!("{} service not found", product_code)));
    }
  };
  let mut target_url = Url::parse(&format!("http://127.0.0.1:{}", port)).unwrap();
  target_url.set_path(client_req.uri().path());
  target_url.set_query(client_req.uri().query());
  // Forward the request.
  let mut req = reqwest::ClientBuilder::new().build().unwrap().get(target_url);
  for (key, value) in client_req.headers() {
    req = req.header(key, value);
  }
  let target_response = req.send().await.unwrap();

  let status = target_response.status().as_u16();
  if status != 101 {
    return Err(Box::new(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "Target did not reply with 101 upgrade")));
  }
  // 复制所有的header
  let mut client_response = HttpResponse::SwitchingProtocols();
  client_response.upgrade("websocket");
  for (header, value) in target_response.headers() {
    client_response.insert_header((header.to_owned(), value.to_owned()));
  }
  let target_upgrade = target_response.upgrade().await?;
  let (target_rx, mut target_tx) = tokio::io::split(target_upgrade);
  rt::spawn(async move {
    let mut client_stream = client_stream.map(|result| result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
    let mut client_read = tokio_util::io::StreamReader::new(&mut client_stream);
    let result = tokio::io::copy(&mut client_read, &mut target_tx).await;
    if let Err(err) = result {
      println!("Error proxying websocket client bytes to target: {err}")
    }
  });

  let target_stream = tokio_util::io::ReaderStream::new(target_rx);
  Ok(client_response.streaming(target_stream))
}
///路由转发
/// 这是一个异步函数，它将 HTTP 请求转发到指定的工作线程。
/// 它接受 HTTP 请求、有效负载、对等地址和客户端数据对象作为参数。
/// 它首先从查询字符串或标头中提取产品代码，然后从全局端口表中检索相应的工作端口。
/// 它根据检索到的端口以及原始请求路径和查询字符串构造一个新的 URL。
/// 然后，它使用客户端对象将请求发送给新的工作线程，并将对等地址添加到“x-forwarded-for”标头（如果提供）中。
/// 最后，它根据从工作线程收到的响应构建并返回 HTTP 响应。
pub async fn forward(req: HttpRequest, payload: web::Payload, peer_addr: Option<PeerAddr>, client: web::Data<Client>) -> Result<HttpResponse, Error> {
  let query = req.uri().query();
  let qs = QString::from(query.unwrap_or_default());
  let mut product_code: &str = qs.get("product_code").unwrap_or("");
  if product_code.is_empty() {
    product_code = match req.headers().get("product_code") {
      Some(p) => p.to_str().unwrap(),
      None => {
        return Ok(HttpResponse::NotFound().body("product_code not found"));
      }
    };
  }
  let id = ScriptWorkerId(product_code.to_string());
  let hand_port = PORT_TABLE.read().unwrap();
  let WorkerPort(port) = match hand_port.get(&id) {
    Some(p) => p,
    None => {
      return Ok(HttpResponse::NotFound().body(format!("{} service not found", product_code)));
    }
  };
  let mut new_url = Url::parse(&format!("http://127.0.0.1:{}", port)).unwrap();
  new_url.set_path(req.uri().path());
  new_url.set_query(req.uri().query());
  let forwarded_req = client.request_from(new_url.as_str(), req.head()).no_decompress();
  let forwarded_req = match peer_addr {
    Some(PeerAddr(addr)) => forwarded_req.insert_header(("x-forwarded-for", addr.ip().to_string())),
    None => forwarded_req,
  };

  let res = forwarded_req.send_stream(payload).await.map_err(error::ErrorInternalServerError)?;
  let mut client_resp = HttpResponse::build(res.status());
  for (header_name, header_value) in res.headers().iter().filter(|(h, _)| *h != "connection") {
    client_resp.insert_header((header_name.clone(), header_value.clone()));
  }
  Ok(client_resp.streaming(res))
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Res<T> {
  pub code: i32,
  pub data: T,
}

impl<T> Res<T>
where
  T: Serialize + DeserializeOwned + Clone,
{
  pub fn respond_to(self) -> HttpResponse {
    HttpResponse::Ok().content_type("application/json").body(self.to_string())
  }
}
impl<T> ToString for Res<T>
where
  T: Serialize + DeserializeOwned + Clone,
{
  fn to_string(&self) -> String {
    serde_json::to_string(self).unwrap()
  }
}
